################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from typing import List, Optional

from pypaimon.catalog.catalog_environment import CatalogEnvironment
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.read.read_builder import ReadBuilder
from pypaimon.schema.schema_manager import SchemaManager
from pypaimon.schema.table_schema import TableSchema
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.table import Table
from pypaimon.write.write_builder import BatchWriteBuilder, StreamWriteBuilder
from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor,
                                              FixedBucketRowKeyExtractor,
                                              PostponeBucketRowKeyExtractor,
                                              RowKeyExtractor,
                                              UnawareBucketRowKeyExtractor)


class FileStoreTable(Table):
    def __init__(self, file_io: FileIO, identifier: Identifier, table_path: str,
                 table_schema: TableSchema, catalog_environment: Optional[CatalogEnvironment] = None):
        self.file_io = file_io
        self.identifier = identifier
        self.table_path = table_path
        self.catalog_environment = catalog_environment or CatalogEnvironment.empty()

        self.table_schema = table_schema
        self.fields = table_schema.fields
        self.field_names = [field.name for field in table_schema.fields]
        self.field_dict = {field.name: field for field in self.fields}
        self.primary_keys = table_schema.primary_keys
        self.primary_keys_fields = [self.field_dict[name] for name in self.primary_keys]
        self.partition_keys = table_schema.partition_keys
        self.partition_keys_fields = [self.field_dict[name] for name in self.partition_keys]
        self.trimmed_primary_keys = [pk for pk in self.primary_keys if pk not in self.partition_keys]
        self.trimmed_primary_keys_fields = [self.field_dict[name] for name in self.trimmed_primary_keys]

        self.options = table_schema.options
        self.cross_partition_update = self.table_schema.cross_partition_update()
        self.is_primary_key_table = bool(self.primary_keys)
        self.total_buckets = int(table_schema.options.get(CoreOptions.BUCKET, -1))

        self.schema_manager = SchemaManager(file_io, table_path)

    def current_branch(self) -> str:
        """Get the current branch name from options."""
        return self.options.get(CoreOptions.BRANCH, "main")

    def snapshot_manager(self):
        """Get the snapshot manager for this table."""
        from pypaimon.snapshot.snapshot_manager import SnapshotManager
        return SnapshotManager(self)

    def path_factory(self) -> 'FileStorePathFactory':
        from pypaimon.utils.file_store_path_factory import FileStorePathFactory

        # Get external paths
        external_paths = self._create_external_paths()

        # Get format identifier
        format_identifier = CoreOptions.file_format(self.options)

        file_compression = CoreOptions.file_compression(self.options)

        return FileStorePathFactory(
            root=str(self.table_path),
            partition_keys=self.partition_keys,
            default_part_value="__DEFAULT_PARTITION__",
            format_identifier=format_identifier,
            data_file_prefix="data-",
            changelog_file_prefix="changelog-",
            legacy_partition_name=True,
            file_suffix_include_compression=False,
            file_compression=file_compression,
            data_file_path_directory=None,
            external_paths=external_paths,
            index_file_in_data_file_dir=False,
        )

    def new_snapshot_commit(self):
        """Create a new SnapshotCommit instance using the catalog environment."""
        return self.catalog_environment.snapshot_commit(self.snapshot_manager())

    def bucket_mode(self) -> BucketMode:
        if self.is_primary_key_table:
            if int(self.options.get(CoreOptions.BUCKET, -1)) == -2:
                return BucketMode.POSTPONE_MODE
            elif int(self.options.get(CoreOptions.BUCKET, -1)) == -1:
                if self.cross_partition_update:
                    return BucketMode.CROSS_PARTITION
                else:
                    return BucketMode.HASH_DYNAMIC
            else:
                return BucketMode.HASH_FIXED
        else:
            if int(self.options.get(CoreOptions.BUCKET, -1)) == -1:
                return BucketMode.BUCKET_UNAWARE
            else:
                return BucketMode.HASH_FIXED

    def new_read_builder(self) -> 'ReadBuilder':
        return ReadBuilder(self)

    def new_batch_write_builder(self) -> BatchWriteBuilder:
        return BatchWriteBuilder(self)

    def new_stream_write_builder(self) -> StreamWriteBuilder:
        return StreamWriteBuilder(self)

    def create_row_key_extractor(self) -> RowKeyExtractor:
        bucket_mode = self.bucket_mode()
        if bucket_mode == BucketMode.HASH_FIXED:
            return FixedBucketRowKeyExtractor(self.table_schema)
        elif bucket_mode == BucketMode.BUCKET_UNAWARE:
            return UnawareBucketRowKeyExtractor(self.table_schema)
        elif bucket_mode == BucketMode.POSTPONE_MODE:
            return PostponeBucketRowKeyExtractor(self.table_schema)
        elif bucket_mode == BucketMode.HASH_DYNAMIC or bucket_mode == BucketMode.CROSS_PARTITION:
            return DynamicBucketRowKeyExtractor(self.table_schema)
        else:
            raise ValueError(f"Unsupported bucket mode: {bucket_mode}")

    def copy(self, options: dict) -> 'FileStoreTable':
        if CoreOptions.BUCKET in options and options.get(CoreOptions.BUCKET) != self.options.get(CoreOptions.BUCKET):
            raise ValueError("Cannot change bucket number")
        new_options = self.options.copy()
        for k, v in options.items():
            if v is None:
                new_options.pop(k)
            else:
                new_options[k] = v
        new_table_schema = self.table_schema.copy(new_options=new_options)
        return FileStoreTable(self.file_io, self.identifier, self.table_path, new_table_schema,
                              self.catalog_environment)

    def add_options(self, options: dict):
        for key, value in options.items():
            self.options[key] = value

    def _create_external_paths(self) -> List[str]:
        from urllib.parse import urlparse
        from pypaimon.common.core_options import ExternalPathStrategy

        external_paths_str = CoreOptions.data_file_external_paths(self.options)
        if not external_paths_str:
            return []

        strategy = CoreOptions.external_path_strategy(self.options)
        if strategy == ExternalPathStrategy.NONE:
            return []

        specific_fs = CoreOptions.external_specific_fs(self.options)

        paths = []
        for path_string in external_paths_str:
            if not path_string:
                continue

            # Parse and validate path
            parsed = urlparse(path_string)
            scheme = parsed.scheme
            if not scheme:
                raise ValueError(
                    f"External path must have a scheme (e.g., oss://, s3://, file://): {path_string}"
                )

            # Filter by specific filesystem if strategy is specific-fs
            if strategy == ExternalPathStrategy.SPECIFIC_FS:
                if not specific_fs:
                    raise ValueError(
                        f"data-file.external-paths.specific-fs must be set when "
                        f"strategy is {ExternalPathStrategy.SPECIFIC_FS}"
                    )
                if scheme.lower() != specific_fs.lower():
                    continue  # Skip paths that don't match the specific filesystem

            paths.append(path_string)

        if not paths:
            raise ValueError("No valid external paths found after filtering")

        return paths
